package com.joysuccess.queue.strategy;

import com.joysuccess.common.utils.SpringUtil;
import com.joysuccess.queue.core.ProducerStrategy;
import com.joysuccess.queue.core.QueuePropertiesHandler;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

import java.util.Properties;

/**
 * @author : zhangqing
 * @Description: kafka 抽象类
 * @date : 2020年06月09日 08:03
 */
public class KafkaProducerStrategy implements ProducerStrategy<Producer> {
    private final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerStrategy.class);

    /**
     * 单例模式：懒汉模式
     */
    private static volatile KafkaProducerStrategy instance;

    public static KafkaProducerStrategy getInstance() {
        if(instance == null){
            synchronized (KafkaProducerStrategy.class){
                if(null == instance){
                    instance = new KafkaProducerStrategy();
                }
            }
        }
        return instance;
    }

    @Override
    public Producer getProducerClient() {
        QueuePropertiesHandler queuePropertiesHandler = SpringUtil.getBean(QueuePropertiesHandler.class);
        // 定义Kafka服务信连接信息
        Properties props = new Properties();
        // Kafka broker连接地址
        props.put("bootstrap.servers",queuePropertiesHandler.getKafkaServers());
        props.put("acks",queuePropertiesHandler.getAcks());
        // producer自动重试失败的发送次数
        props.put("retries",queuePropertiesHandler.getRetries());
        // producer批量发送的基本单位，默认是16384Bytes，即16kB
        props.put("batch.size",queuePropertiesHandler.getBatchSize());
        // 发送线程在检查batch是否ready的时候，判断有没有过期的参数，默认大小是0ms
        props.put("linger.ms",queuePropertiesHandler.getLingerMs());
        // Producer会创建一个BufferPool，totalSize为buffer.memory。
        // pool里会创建很多batch，每个batch大小就是batch.size
        props.put("buffer.memory", queuePropertiesHandler.getBufferMemory());
        props.put("client.id",queuePropertiesHandler.getClientId());
        // 序列化类
        props.put("key.serializer", queuePropertiesHandler.getKeySerializer());
        props.put("value.serializer", queuePropertiesHandler.getValueSerializer());
        ProducerFactory kafkaProducerFactory = new DefaultKafkaProducerFactory(props);
        return kafkaProducerFactory.createProducer();
    }

    @Override
    public void sendRecordToQueue(String topic,String data) {
        getProducerClient().send(new ProducerRecord<String, String>(topic,data), new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    LOGGER.error("kafka消息发送失败", metadata.partition(),
                            metadata.offset(),data);
                    e.printStackTrace();
                }
            }
        });
    }
}
